# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2.

from __future__ import absolute_import

import glob
import os
import shutil
import tempfile
import unittest

from sapling import util


atomictempfile = util.atomictempfile

# force dealing with tmp filenames that go
# over the maximum file length
filename = "A" * 253

try:
    xrange(0)
except NameError:
    xrange = range


class testatomictempfile(unittest.TestCase):
    def setUp(self):
        self._testdir = tempfile.mkdtemp("atomictempfiletest")
        self._filename = os.path.join(self._testdir, filename)

    def tearDown(self):
        shutil.rmtree(self._testdir, True)

    def testsimple(self):
        file = atomictempfile(self._filename)
        self.assertFalse(os.path.isfile(self._filename))
        tempfilename = file._tempname
        self.assertTrue(tempfilename in glob.glob(os.path.join(self._testdir, ".*")))

        file.write(b"argh\n")
        file.close()

        self.assertTrue(os.path.isfile(self._filename))
        self.assertTrue(
            tempfilename
            not in glob.glob(os.path.join(self._testdir, ".testfilename-*"))
        )

    # discard() removes the temp file without making the write permanent
    def testdiscard(self):
        file = atomictempfile(self._filename)
        (dir, basename) = os.path.split(file._tempname)

        file.write(b"yo\n")
        file.discard()

        self.assertFalse(os.path.isfile(self._filename))
        self.assertTrue(basename not in os.listdir("."))

    # if a programmer screws up and passes bad args to atomictempfile, they
    # get a plain ordinary TypeError, not infinite recursion
    def testoops(self):
        with self.assertRaises(TypeError):
            atomictempfile()

    # checkambig=True avoids ambiguity of timestamp
    def testcheckambig(self):
        def atomicwrite(checkambig):
            f = atomictempfile(self._filename, checkambig=checkambig)
            f.writeutf8("FOO")
            f.close()

        # try some times, because reproduction of ambiguity depends on
        # "filesystem time"
        for i in xrange(5):
            atomicwrite(False)
            oldstat = util.stat(self._filename)
            if oldstat.st_ctime != oldstat.st_mtime:
                # subsequent changing never causes ambiguity
                continue

            repetition = 3

            # repeat atomic write with checkambig=True, to examine
            # whether st_mtime is advanced multiple times as expected
            for j in xrange(repetition):
                atomicwrite(True)
            newstat = util.stat(self._filename)
            if oldstat.st_ctime != newstat.st_ctime:
                # timestamp ambiguity was naturally avoided while repetition
                continue

            # st_mtime should be advanced "repetition" times, because
            # all atomicwrite() occurred at same time (in sec)
            self.assertTrue(
                newstat.st_mtime == ((oldstat.st_mtime + repetition) & 0x7FFFFFFF)
            )
            # no more examination is needed, if assumption above is true
            break
        else:
            # This platform seems too slow to examine anti-ambiguity
            # of file timestamp (or test happened to be executed at
            # bad timing). Exit silently in this case, because running
            # on other faster platforms can detect problems
            pass

    def testread(self):
        with open(self._filename, "wb") as f:
            f.write(b"foobar\n")
        file = atomictempfile(self._filename, mode="rb")
        self.assertTrue(file.read(), b"foobar\n")
        file.discard()

    def testcontextmanagersuccess(self):
        """When the context closes, the file is closed"""
        with atomictempfile("foo") as f:
            self.assertFalse(os.path.isfile("foo"))
            f.write(b"argh\n")
        self.assertTrue(os.path.isfile("foo"))

    def testcontextmanagerfailure(self):
        """On exception, the file is discarded"""
        try:
            with atomictempfile("foo") as f:
                self.assertFalse(os.path.isfile("foo"))
                f.write(b"argh\n")
                raise ValueError
        except ValueError:
            pass
        self.assertFalse(os.path.isfile("foo"))


if __name__ == "__main__":
    import silenttestrunner

    silenttestrunner.main(__name__)
